Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rdb save): add blob compression on snapshot #505

Merged
merged 1 commit into from
Nov 29, 2022

Conversation

adiholden
Copy link
Collaborator

Signed-off-by: adi_holden adi@dragonflydb.io

@adiholden adiholden requested a review from romange November 21, 2022 12:44
@@ -773,13 +778,16 @@ class RdbSaver::Impl {
RdbSerializer meta_serializer_;
SliceSnapshot::RecordChannel channel_;
std::optional<AlignedBuffer> aligned_buf_;
bool enhanced_compression_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add comment saying what it means.

@@ -19,6 +22,22 @@ struct Entry;

class RdbSerializer;

class ZstdCompress {
public:
ZstdCompress() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add deleted constructor and operator= to prevent copying by chance.

size_t channel_bytes_ = 0;
size_t serialized_ = 0, skipped_ = 0, side_saved_ = 0, savecb_calls_ = 0;
uint64_t rec_id_ = 0;
uint32_t num_records_in_blob_ = 0;

bool enhanced_compression_ = false;
ZstdCompress compressor_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make it unique_ptr<ZstdCompress> to avoid allocating the context in case compression is false.

size_t compressed_size =
ZSTD_compressCCtx(cctx_, compr_buf_.data(), compr_buf_.capacity(), str.data(), str.size(), 1);
if (compressed_size > str.size() * 0.85) {
++comperssion_no_effective_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comperssion -> compression

void SliceSnapshot::PushFileToChannel(io::StringFile* sfile, DbIndex db_index, unsigned num_records,
bool should_compress) {
string string_to_push = std::move(sfile->val);
if (should_compress && string_to_push.size() > 4096) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please introduce a named variable and maybe reduce its value to 256.
also please check if compression makes sense at all.

@@ -19,6 +22,22 @@ struct Entry;

class RdbSerializer;

class ZstdCompress {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: i would move this class definition into cc and just forward declare it here.
why - to avoid bringing "zstd.h" includes to all the cc files that include this header since it's really an implementation detail. Also see my comment below about unique_ptr

compr_buf_.reserve(buf_size);
}
size_t compressed_size =
ZSTD_compressCCtx(cctx_, compr_buf_.data(), compr_buf_.capacity(), str.data(), str.size(), 1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please introduce a flag snapshot_compression_level - it will be convenient for playing with compression.

@adiholden adiholden force-pushed the compress_on_snapshot branch 2 times, most recently from 5b0acf1 to c76a42b Compare November 24, 2022 10:34
Copy link
Collaborator

@romange romange left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work. Are you going to send test code in a separate pr?

// Push uncompressed blob to its membuf
uncompressed_mem_buf_.Reserve(uncompressed_blob.size());
IoBuf::Bytes dest = uncompressed_mem_buf_.AppendBuffer();
CHECK_GE(dest.size(), uncompressed_blob.size());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please avoid to check failing as a result of bad data. please use error_code for this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe my comment is wrong here. You can not check-fail as a result of bad data, you verify the invariant, right?

uncompressed_mem_buf_.Reserve(uncompressed_blob.size());
IoBuf::Bytes dest = uncompressed_mem_buf_.AppendBuffer();
CHECK_GE(dest.size(), uncompressed_blob.size());
memcpy(dest.data(), uncompressed_blob.data(), uncompressed_blob.size());
Copy link
Collaborator

@romange romange Nov 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is some inefficiency here (double write to memory). maybe add todo: to adjust interfaces so that we avoid redundant memory copy.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I am fixing this now

@@ -737,7 +743,7 @@ class RdbSaver::Impl {
public:
// We pass K=sz to say how many producers are pushing data in order to maintain
// correct closing semantics - channel is closing when K producers marked it as closed.
Impl(bool align_writes, unsigned producers_len, io::Sink* sink);
Impl(bool align_writes, unsigned producers_len, bool multi_entires_compression, io::Sink* sink);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

multi_entires_compression -> multi_entries_compression

src/server/rdb_save.h Show resolved Hide resolved
size_t channel_bytes_ = 0;
size_t serialized_ = 0, skipped_ = 0, side_saved_ = 0, savecb_calls_ = 0;
uint64_t rec_id_ = 0;
uint32_t num_records_in_blob_ = 0;

bool multi_entires_compression_ = false;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

multi_entries_compression_

}
auto comp_res = zstd_serializer_->Compress(string_to_push);
if (comp_res.first) {
string_to_push = comp_res.second;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

string_to_push.swap(comp_res.second);

// Compress the string with end blob opcode
string to_compress(str.data(), str.size());
opcode = RDB_OPCODE_COMPRESSED_BLOB_END;
to_compress.append(reinterpret_cast<const char*>(&opcode), 1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to_compress.push_back(opcode);

src/server/rdb_load.cc Show resolved Hide resolved
#include "base/logging.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/rdb_extensions.h"
#include "server/snapshot.h"
#include "util/fibers/simple_channel.h"

ABSL_FLAG(bool, save_enhanced_compression, true, "DF save compressed blobs of multiple entries");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a bug when you run dragonfly with save_enhanced_compression=false it does not compress strings with lzf as it did beforehand.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Owww not only that but I do double compression lzf and zstd when I have the flag=true

@romange
Copy link
Collaborator

romange commented Nov 24, 2022

Please see the profile below.
I run "debug populate 50000000 key 550" and then "save df" when running dragonfly
with "--save_enhanced_compression=true"

profile

@adiholden adiholden force-pushed the compress_on_snapshot branch 2 times, most recently from 77f0817 to a4b69b4 Compare November 27, 2022 13:45
producer_count = 0;
if (compression_mode == 2) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets keep it as RDB (no need for multi-entry here).

}

// Compress the string
string_view compresse_res = impl_->Compress(str);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compresse_res -> compressed_res

// Write encoded compressed string len and than the compressed string
serialized_compressed_blob.append(reinterpret_cast<const char*>(buf), enclen);
serialized_compressed_blob.append(compresse_res);
return std::make_pair(true, serialized_compressed_blob);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::move(serialized_compressed_blob)

~ZstdCompressSerializer();

// Returns a pair consisting of an bool denoting whether the string was compressed
// and a string the result of compression. If givven string was not compressed returned
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

givven -> given

@adiholden adiholden force-pushed the compress_on_snapshot branch from a4b69b4 to b52c0f0 Compare November 28, 2022 09:55
@adiholden adiholden requested a review from romange November 28, 2022 09:58
romange
romange previously approved these changes Nov 28, 2022
romange
romange previously approved these changes Nov 28, 2022
Signed-off-by: adi_holden <adi@dragonflydb.io>
@adiholden adiholden merged commit 685b441 into main Nov 29, 2022
@romange romange deleted the compress_on_snapshot branch November 29, 2022 09:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants